/*
/*-
 * <<
 * Moonbox
 * ==
 * Copyright (C) 2016 - 2019 EDP
 * ==
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * >>
 */

package moonbox.jdbc

import java.sql
import java.sql.{Blob, CallableStatement, Clob, Connection, DatabaseMetaData, NClob, PreparedStatement, SQLException, SQLWarning, SQLXML, Savepoint, Statement, Struct}
import java.util.Properties
import java.util.concurrent.Executor

import moonbox.client.{ClientOptions, MoonboxClient}

import scala.collection.JavaConverters._

class MoonboxConnection(url: String, props: Properties) extends java.sql.Connection {

  import MoonboxJDBCUtils._

  private var jdbcSession: JdbcSession = _
  private var statement: MoonboxStatement = _
  private var isLocal: Boolean = _
  private var _readOnly: Boolean = _

  def init(): Boolean = {
    val newProps = parseURL(url, props)
    val username = newProps.getProperty(USER_KEY)
    val pwd = newProps.getProperty(PASSWORD_KEY)
    isLocal = Option(newProps.getProperty(IS_LOCAL_KEY)).exists(_.toBoolean)
    val (host, port) = parseHostsAndPorts(newProps.getProperty(HOSTS_AND_PORTS)).map { case (h, p) => (h, p.toInt) }.head
    val clientOptions = ClientOptions.builder()
      .options(newProps.asScala.toMap)
      .host(host)
      .port(port)
      .user(username)
      .password(pwd)
      .isLocal(isLocal)
      .database(newProps.getProperty(DB_NAME, "default"))
      .build()
    val moonboxClient = try {
      MoonboxClient.builder(clientOptions).build()
    } catch {
      case e: Exception =>
        throw new SQLException(e.getMessage)
    }
    initSession(moonboxClient, clientOptions, newProps)
    true
  }

  def getUrl(): String = url

  def getUserName(): String = props.getProperty(USER_KEY)

  private def initSession(moonboxClient: MoonboxClient, clientOptions: ClientOptions, props: Properties): Unit = {
    if (clientOptions.password.isDefined && clientOptions.password.get.length > 0) {
      jdbcSession = JdbcSession(moonboxClient,
        clientOptions.database,
        clientOptions.user.get,
        "*",
        props,
        clientOptions.isLocal)
    } else {
      jdbcSession = JdbcSession(moonboxClient,
        clientOptions.database,
        clientOptions.user.get,
        "*",
        props,
        clientOptions.isLocal)
    }
  }

  def getSession: JdbcSession = jdbcSession

  override def commit(): Unit = {}

  // TODO:
  override def getHoldability: Int = 1

  override def setCatalog(catalog: String): Unit = {
    // TODO:
    jdbcSession.moonboxClient.setCurrentDatabase(catalog)
    jdbcSession = jdbcSession.copy(database = catalog)
  }

  override def setHoldability(holdability: Int): Unit = {}

  override def prepareStatement(sql: String): PreparedStatement = new MoonboxPrepareStatement(createStatement(), sql)

  override def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int): PreparedStatement = prepareStatement(sql)

  override def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): PreparedStatement = prepareStatement(sql)

  override def prepareStatement(sql: String, autoGeneratedKeys: Int): PreparedStatement = prepareStatement(sql)

  override def prepareStatement(sql: String, columnIndexes: Array[Int]): PreparedStatement = prepareStatement(sql)

  override def prepareStatement(sql: String, columnNames: Array[String]): PreparedStatement = prepareStatement(sql)

  override def createClob(): Clob = throw new SQLException("Unsupported")

  override def setSchema(schema: String): Unit = {
    jdbcSession.moonboxClient.setCurrentDatabase(schema)
    jdbcSession = jdbcSession.copy(database = schema)
  }

  override def setClientInfo(name: String, value: String): Unit = {}

  override def setClientInfo(properties: Properties): Unit = {}

  override def createSQLXML(): SQLXML = throw new SQLException("Unsupported")

  override def getCatalog: String = jdbcSession.moonboxClient.getCurrentDatabase

  // TODO: get current database
  override def createBlob(): Blob = throw new SQLException("Unsupported")

  def checkClosed(): Unit = {
    if (isClosed()) throw new SQLException("Connection has already been closed.")
  }

  override def createStatement(): Statement = {
    checkClosed()
    statement = new MoonboxStatement(this)
    if (props.containsKey(MAX_ROWS)) {
      statement.setMaxRows(props.getProperty(MAX_ROWS).toInt)
    }
    if (props.containsKey(FETCH_SIZE)) {
      statement.setFetchSize(props.getProperty(FETCH_SIZE).toInt)
    }
    if (props.containsKey(FETCH_SIZE)) {
      statement.setFetchSize(props.getProperty(FETCH_SIZE).toInt)
    }
    if(props.containsKey(READ_TIMEOUT)) {
      statement.setQueryTimeout(props.getProperty(READ_TIMEOUT).toInt)
    }
    statement
  }

  override def createStatement(resultSetType: Int, resultSetConcurrency: Int): Statement = createStatement()

  override def createStatement(resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): Statement = createStatement()

  override def abort(executor: Executor): Unit = this.close()

  override def setAutoCommit(autoCommit: Boolean): Unit = {}

  /* commit is unsupported */
  override def getMetaData: DatabaseMetaData = new MoonboxDatabaseMetaData(this)

  override def setReadOnly(readOnly: Boolean): Unit = {
    _readOnly = readOnly
  }

  override def prepareCall(sql: String): CallableStatement = throw new SQLException("Unsupported")

  override def prepareCall(sql: String, resultSetType: Int, resultSetConcurrency: Int): CallableStatement = throw new SQLException("Unsupported")

  override def prepareCall(sql: String, resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): CallableStatement = throw new SQLException("Unsupported")

  override def setTransactionIsolation(level: Int): Unit = {}

  override def getWarnings: SQLWarning = throw new SQLException("Unsupported")

  override def releaseSavepoint(savepoint: Savepoint): Unit = throw new SQLException("Unsupported")

  override def nativeSQL(sql: String): String = throw new SQLException("Unsupported")

  override def isReadOnly: Boolean = _readOnly

  override def createArrayOf(typeName: String, elements: Array[AnyRef]): sql.Array = throw new SQLException("Unsupported")

  override def setSavepoint(): Savepoint = throw new SQLException("Unsupported")

  override def setSavepoint(name: String): Savepoint = throw new SQLException("Unsupported")

  override def close(): Unit = {
    if (statement != null && !statement.isClosed) {
      statement.close()
    }
    statement = null
    if (jdbcSession != null || !jdbcSession.isClosed) {
      jdbcSession.close()
    }
    jdbcSession = null
  }

  override def createNClob(): NClob = throw new SQLException("Unsupported")

  override def rollback(): Unit = {}

  override def rollback(savepoint: Savepoint): Unit = {}

  override def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit = jdbcSession.setReadTimeout(milliseconds)

  override def setTypeMap(map: java.util.Map[String, Class[_]]): Unit = {}

  override def isValid(timeout: Int): Boolean = !isClosed()

  override def getAutoCommit: Boolean = false

  override def clearWarnings(): Unit = {}

  override def getSchema: String = jdbcSession.moonboxClient.getCurrentDatabase

  // TODO: get current database
  override def getNetworkTimeout: Int = jdbcSession.getReadTimeout

  override def isClosed: Boolean = if (jdbcSession == null || jdbcSession.isClosed) true else false

  override def getTransactionIsolation: Int = Connection.TRANSACTION_NONE

  override def createStruct(typeName: String, attributes: Array[AnyRef]): Struct = throw new SQLException("Unsupported")

  override def getClientInfo(name: String): String = throw new SQLException("Unsupported")

  override def getClientInfo: Properties = throw new SQLException("Unsupported")

  override def getTypeMap: java.util.Map[String, Class[_]] = throw new SQLException("Unsupported")

  override def unwrap[T](iface: Class[T]): T = {
    if (isWrapperFor(iface)) this.asInstanceOf[T]
    else throw new SQLException("unwrap exception")
  }

  override def isWrapperFor(iface: Class[_]): Boolean = iface != null && iface.isAssignableFrom(getClass)
}
*/
